跳到主要内容

Go 并发控制神器之 Context

为什么需要 Context ?

一个请求可能会在多个 goroutine 中去处理,所以如果其中一个 goroutine 超时了或者中断了,那这个 Request 就应该被立即停止结束,而不是一直等待。

所以,每个长请求都应该有个超时限制,一个长请求需要一个 Context 来保证整个请求都是在同一个生命周期内。有点“一荣俱荣,一损俱损”的味道。

所以任何可能被阻塞,或者需要很长时间来完成的,都应该有个 context.Context

  • rpc 调用
  • 长请求,长链路/多函数调用

Context 的使用 ⭐

Context 接口

Context 只定义了接口,凡是实现该接口的类都可称为是一种 context。

type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
  • 「Deadline」 方法:可以获取设置的截止时间,返回值 deadline 是截止时间,到了这个时间,Context 会自动发起取消请求,返回值 ok 表示是否设置了截止时间。
  • 「Done」 方法:返回一个只读的 channel ,类型为 struct{}。如果这个 chan 可以读取,说明已经发出了取消信号,可以做清理操作,然后退出协程,释放资源。
  • 「Err」 方法:返回 Context 被取消的原因。
  • 「Value」 方法:获取 Context 上绑定的值,是一个键值对,通过 key 来获取对应的值。

创建 Context

context 包主要提供了两种方式创建 context:

context.Backgroud()
context.TODO()

这两个函数其实只是互为别名,没有差别,官方给的定义是:

  • context.Background 是上下文的默认值,所有其他的上下文都应该从它衍生(Derived)出来。
  • context.TODO 应该只在不确定应该使用哪种上下文时使用;

所以在大多数情况下,我们都使用 context.Background 作为起始的上下文向下传递。

上面的两种方式是创建根 context,不具备任何功能,具体实践还是要依靠 context 包提供的 With 系列函数来进行派生:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

这四个函数都要基于父 Context 衍生,通过这些函数,就创建了一颗 Context 树,树的每个节点都可以有任意多个子节点,节点层级可以有任意多个,画个图表示一下:

基于一个父 Context 可以随意衍生,其实这就是一个 Context 树,树的每个节点都可以有任意多个子节点,节点层级可以有任意多个,每个子节点都依赖于其父节点,例如上图,我们可以基于 Context.Background 衍生出四个子 context:ctx1.0-cancelctx2.0-deadlinectx3.0-timeoutctx4.0-withvalue,这四个子 context 还可以作为父 context 继续向下衍生,即使其中 ctx1.0-cancel 节点取消了,也不影响其他三个父节点分支。

创建 context 方法和 context 的衍生方法就这些,下面我们就一个一个来看一下他们如何被使用。

WithValue 携带数据

如果需要往子协程中传递参数,可以使用 context.WithValue(),相当于创建一个全局 “Map”,被传递 Context 的子 goroutine 都可以从这个 “Map” 中取得数据

我们日常在业务开发中都希望能有一个 trace_id 能串联所有的日志,这就需要我们打印日志时能够获取到这个 trace_id,在 java 中我们可以用 ThreadLocal 来传递,在 Go 语言中我们就可以使用 Context 来传递,通过使用 WithValue 来创建一个携带 trace_id 的 context,然后不断透传下去,打印日志时输出即可,来看使用例子:

import (
"context"
"fmt"
"github.com/google/uuid"
"strings"
"time"
)

const (
KEY = "trace_id"
)

func NewRequestID() string {
return strings.Replace(uuid.New().String(), "-", "", -1)
}

func NewContextWithTraceID() context.Context {
// WithValue 传递三个参数 context、key、value
ctx := context.WithValue(context.Background(), KEY, NewRequestID())
return ctx
}

// 打印日志
func PrintLog(ctx context.Context, message string) {
fmt.Printf("%s|info|trace_id=%s|%s",
time.Now().Format("2006-01-02 15:04:05"),
GetContextValue(ctx, KEY), message)
}

// 从 context 取得 value
func GetContextValue(ctx context.Context, k string) string {
v, ok := ctx.Value(k).(string)
if !ok {
return ""
}
return v
}

func ProcessEnter(ctx context.Context) {
PrintLog(ctx, "测试日志")
}

func main() {
ProcessEnter(NewContextWithTraceID())
}

输出结果

2021-11-15 19:08:19|info|trace_id=3fded7ea13f9403e8921506b62cc5953|测试日志

基于 context.Background 创建一个携带 trace_id 的 ctx,然后通过 context 树一起传递,从中派生的任何 context 都会获取此值,我们最后打印日志的时候就可以从 ctx 中取值输出到日志中。目前一些 RPC 框架都是支持了 Context,所以 trace_id 的向下传递就更方便了。

在使用 withValue 时要注意四个事项:

1、不建议使用 context 值传递关键参数,关键参数应该显示的声明出来,不应该隐式处理,context 中最好是携带签名、trace_id 这类值。

2、因为携带 value 也是 key、value 的形式,为了避免 context 因多个包同时使用 context 而带来冲突,key 建议采用内置类型。

3、上面的例子我们获取 trace_id 是直接从当前 ctx 获取的,实际我们也可以获取父 context 中的 value,在获取键值对是,我们先从当前 context 中查找,没有找到会在从父 context 中查找该键对应的值直到在某个父 context 中返回 nil 或者查找到对应的值。

4、context 传递的数据中 key、value 都是 interface 类型,这种类型编译期无法确定类型,所以不是很安全,所以在类型断言时别忘了保证程序的健壮性。

withCancel 取消控制

一个goroutine启动后,我们是无法控制他的,大部分情况是等待它自己结束,那么如果这个 goroutine 是一个不会自己结束的后台 goroutine 呢?比如监控等,会一直运行的。

这种情况化,一直傻瓜式的办法是全局变量,其他地方通过修改这个变量完成结束通知,然后后台 goroutine 不停的检查这个变量,如果发现被通知关闭了,就自我结束。

这种方式也可以,但是首先我们要保证这个变量在多线程下的安全,基于此可以使用 context.WithCancel

context.WithCancel 函数能够从 context.Context 中衍生出一个新的子上下文并返回用于取消该上下文的函数。一旦我们执行返回的取消函数,当前上下文以及它的子上下文都会被取消,所有的 Goroutine 都会同步收到这一取消信号。(注意!!是收到取消信号,而具体是否取消协成要自己处理

它会返回一个 cancelFunc 方法,通过调用这个方法可以发送一个 Done 信号

来看一个例子:

func main()  {
ctx,cancel := context.WithCancel(context.Background())
go Speak(ctx)
time.Sleep(10*time.Second)
cancel()
time.Sleep(1*time.Second)
}

func Speak(ctx context.Context) {
for range time.Tick(time.Second){
select {
case <- ctx.Done():
fmt.Println("我要闭嘴了")
return
default:
fmt.Println("balabalabalabala")
}
}
}

运行结果:

balabalabalabala
....省略
balabalabalabala
我要闭嘴了

我们使用 withCancel 创建一个基于 Background 的 ctx,然后启动一个讲话程序,每隔 1s 说一话,main 函数在 10s 后执行 cancel,那么 speak 检测到取消信号就会退出。

这里要记的一个坑,就是:往从请求入口透传的调用链路中的 context 是携带超时时间的,如果我们想在其中单独开一个 goroutine 去处理其他的事情并且不会随着请求结束后而被取消的话,那么传递的 context 要基于 context.Background 或者 context.TODO 重新衍生一个传递,否决就会和预期不符合了,可以看一篇踩坑文章:context使用不当引发的一个bug

如下,如果根节点 cancel()(取消)了则会导致下面的 context 全部取消(发送 Done 信号)

withTimeout 超时控制

通常健壮的程序都是要设置超时时间的,避免因为服务端长时间响应消耗资源,所以一些 web 框架或 rpc 框架都会采用 withTimeout 或者 withDeadline 来做超时控制,当一次请求到达我们设置的超时时间,就会及时取消,不在往下执行。

withTimeoutwithDeadline 作用是一样的,就是传递的时间参数不同而已,他们都会 通过传入的时间来自动取消 Context,这里要注意的是他们都会返回一个 cancelFunc 方法,通过调用这个方法可以达到提前进行取消,不过在使用的过程还是建议在自动取消后也调用 cancelFunc 去停止定时减少不必要的资源浪费。

withTimeoutWithDeadline 不同在于 WithTimeout 将持续时间作为参数输入而不是时间对象,这两个方法使用哪个都是一样的,看业务场景和个人习惯了,因为本质 withTimout 内部也是调用的 WithDeadline。

现在我们就举个例子来试用一下超时控制,现在我们就模拟一个请求写两个例子:

1、达到超时时间,终止接下来的执行

func main()  {
HttpHandler()
}

func NewContextWithTimeout() (context.Context,context.CancelFunc) {
return context.WithTimeout(context.Background(), 3 * time.Second)
}

func HttpHandler() {
ctx, cancel := NewContextWithTimeout()
defer cancel()
deal(ctx)
}

func deal(ctx context.Context) {
for i:=0; i< 10; i++ {
time.Sleep(1*time.Second)
select {
case <- ctx.Done(): // 用来接收 cancel() 执行后发送的退出信号
fmt.Println(ctx.Err()) // 执行
return
default:
fmt.Printf("deal time is %d\n", i)
}
}
}

输出:

deal time is 0
deal time is 1
context deadline exceeded

2、没有达到超时时间,终止接下来的执行

func main()  {
HttpHandler1()
}

func NewContextWithTimeout1() (context.Context,context.CancelFunc) {
return context.WithTimeout(context.Background(), 3 * time.Second)
}

func HttpHandler1() {
ctx, cancel := NewContextWithTimeout1()
defer cancel()
deal1(ctx, cancel)
}

func deal1(ctx context.Context, cancel context.CancelFunc) {
for i:=0; i< 10; i++ {
time.Sleep(1*time.Second)
select {
case <- ctx.Done(): // 用来接收 cancel() 执行后发送的退出信号
fmt.Println(ctx.Err())
return
default: // 执行
fmt.Printf("deal time is %d\n", i)
cancel()
}
}
}

输出:

deal time is 0
context canceled

使用起来还是比较容易的,既可以超时自动取消,又可以手动控制取消。

自定义 Context 🚧

因为 Context 本质是一个接口,所以我们可以通过实现 Context 达到自定义 Context 的目的,一般在实现 Web 框架或 RPC 框架往往采用这种形式,比如 gin 框架的 Context 就是自己有封装了一层。

源码赏析

Context 其实就是一个接口,定义了四个方法:

type Context interface {
// 当 Context 自动取消或者到了取消时间被取消后返回
Deadline() (deadline time.Time, ok bool)

// 当 Context 被取消或者到了 deadline 返回一个被关闭的 channel
Done() <-chan struct{}

// 当 Context 被取消或者关闭后,返回 context 取消的原因
Err() error

// 获取设置的 key 对应的值
Value(key interface{}) interface{}
}

这个接口主要被三个类继承实现,分别是 emptyCtx、ValueCtx、cancelCtx,采用匿名接口的写法,这样可以对任意实现了该接口的类型进行重写。

下面我们就从创建到使用来层层分析。

创建根 Context

其在我们调用 context.Backgroundcontext.TODO 时创建的对象就是 empty:

var (
background = new(emptyCtx)
todo = new(emptyCtx)
)

func Background() Context {
return background
}

func TODO() Context {
return todo
}

Background 和 TODO 还是一模一样的,官方说:background 它通常由主函数、初始化和测试使用,并作为传入请求的顶级上下文; TODO 是当不清楚要使用哪个 Context 或尚不可用时,代码应使用 context.TODO,后续在在进行替换掉,归根结底就是语义不同而已。

emptyCtx 类

emptyCtx 主要是给我们创建根 Context 时使用的,其实现方法也是一个空结构,实际源代码长这样:

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}

func (*emptyCtx) Done() <-chan struct{} {
return nil
}

func (*emptyCtx) Err() error {
return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}

func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}

valueCtx 类

valueCtx 目的就是为 Context 携带键值对,因为它采用匿名接口的继承实现方式,他会继承父 Context,也就相当于嵌入 Context 当中了

type valueCtx struct {
Context // 匿名字段
key, val interface{}
}

实现了 String 方法输出 Context 和携带的键值对信息:

func (c *valueCtx) String() string {
return contextName(c.Context) + ".WithValue(type " +
reflectlite.TypeOf(c.key).String() +
", val " + stringify(c.val) + ")"
}

实现 Value 方法来存储键值对:

func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}

看图来理解一下:

所以我们在调用 Context 中的 Value 方法时会层层向上调用直到最终的根节点,中间要是找到了 key 就会返回,否会就会找到最终的 emptyCtx 返回 nil。

WithCancel 的实现

来看一下 WithCancel 的入口函数源代码:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}

这个函数执行步骤如下:

  • 创建一个 cancelCtx 对象,作为子 context
  • 然后调用 propagateCancel 构建父子 context 之间的关联关系,这样当父 context 被取消时,子 context 也会被取消。
  • 返回子 context 对象和子树取消函数

我们先分析一下 cancelCtx 这个类。

cancelCtx 类

cancelCtx 继承了 Context,也实现了接口 canceler

type cancelCtx struct {
Context
// 就是一个互斥锁,保证并发安全的,所以 context 是并发安全的
mu sync.Mutex

// 用来做 context 的取消通知信号,之前的版本使用的是chan struct{}类型,现在用atomic.Value做锁优化
done atomic.Value

// key 是接口类型 canceler,目的就是存储实现当前canceler接口的子节点,当根节点发生取消时,遍历子节点发送取消信号
children map[canceler]struct{}

// 当 context 取消时存储取消信息
err error
}

这里实现了 Done 方法,返回的是一个只读的 channel,目的就是我们在外部可以通过这个阻塞的 channel 等待通知信号。

propagateCancel 方法

看 propagateCancel 是如何做构建父子 Context 之间的关联。

func propagateCancel(parent Context, child canceler) {
// 如果返回nil,说明当前父`context`从来不会被取消,是一个空节点,直接返回即可。
done := parent.Done()
if done == nil {
return // parent is never canceled
}

// 提前判断一个父context是否被取消,如果取消了也不需要构建关联了,
// 把当前子节点取消掉并返回
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}

// 这里目的就是找到可以“挂”、“取消”的context
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
// 找到了可以“挂”、“取消”的context,但是已经被取消了,那么这个子节点也不需要
// 继续挂靠了,取消即可
if p.err != nil {
child.cancel(false, p.err)
} else {
// 将当前节点挂到父节点的childrn map中,外面调用cancel时可以层层取消
if p.children == nil {
// 这里因为childer节点也会变成父节点,所以需要初始化map结构
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
// 没有找到可“挂”,“取消”的父节点挂载,那么就开一个goroutine
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}

cancel 方法

最后我们再来看一下返回的 cancel 方法是如何实现,这个方法会关闭上下文中的 Channel 并向所有的子上下文同步取消信号:

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
// 取消时传入的error信息不能为nil, context定义了默认error:var Canceled = errors.New("context canceled")
if err == nil {
panic("context: internal error: missing cancel error")
}
// 已经有错误信息了,说明当前节点已经被取消过了
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}

c.err = err
// 用来关闭channel,通知其他协程
d, _ := c.done.Load().(chan struct{})
if d == nil {
c.done.Store(closedchan)
} else {
close(d)
}
// 当前节点向下取消,遍历它的所有子节点,然后取消
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
// 节点置空
c.children = nil
c.mu.Unlock()
// 把当前节点从父节点中移除,只有在外部父节点调用时才会传true
// 其他都是传false,内部调用都会因为c.children = nil被剔除出去
if removeFromParent {
removeChild(c.Context, c)
}
}

Context 使用原则

  • Context 不要放在结构体中,需要以参数方式传递
  • Context 作为函数参数时,要放在第一位,作为第一个参数
  • 使用 context.Background 函数生成根节点的 Context
  • Context 要传值必要的值,不要什么都传
  • Context 是多协程安全的,可以在多个协程中使用

Reference